
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc};
use tokio::runtime::Runtime;
use crate::util::{AsyncSendData, async_data_channel_new, AsyncRecvData, AsyncRecv, AsyncSend, async_channel_new};
use crate::conf_init::{MapResult, MapErr, work_node_conf};
use crate::logical_plan::work_space::{WorkSpaceDebug, work_space_add_next, work_space_ctl_get};
use crate::run_plan::{HandResult, WindowResult, logical_plan_window_to_run_plan};
use crate::data_frame::{DataInstance,value::RealValue};
use crate::indicators::{indicators_key_ref, Op, IndicatorsMode, work_node_input_register};
use crate::logical_plan::work_node::conf::WorkNodeConf;
use crate::logical_plan::expr::window::{conf_map_to_window};
use crate::logical_plan::work_node::window::WindowInstance;
use crate::logical_plan::work_node::filter::FilterInstance;
use crate::logical_plan::work_node::func::FuncInstance;
use crate::logical_plan::expr::func::{logical_plan_expr_func_map_to_run_plan, logical_plan_expr_func_comb_map_to_run_plan};
use crate::data_frame::window::GroupData;
use async_recursion::async_recursion;
use tokio::sync::RwLock as TokioRwLock;
use tokio::sync::RwLockReadGuard as TokioRwLockReadGuard;
use tokio::sync::RwLockWriteGuard as TokioRwLockWriteGuard;
use once_cell::sync::OnceCell;

pub mod conf;
mod window;
mod filter;
mod func;

pub struct WorkNode{
    name: String,
    input: HashMap<String, AsyncRecvData>,
    /*
        当前WorkNode的入口判断节点，如果数据不满足过滤条件，当前节点不会处理该数据，
        并且不会把数据传给下一个节点
    */
    filter: Vec<(AsyncSendData, AsyncRecvData, Option<Box<FilterInstance>>)>,
    /*
        对进入的数据进行处理，并不会拦截数据，处理完的数据都会传给后续流程
    */
    func: Vec<(AsyncSendData, AsyncRecvData, Option<Box<FuncInstance>>)>,
    /*
        会拦截进入的数据，并且根据实际情况，输出WindowGroup（一组数据）
    */
    window: Vec<(AsyncSendData, AsyncRecvData, Option<Box<WindowInstance>>)>,
    next_channel: (AsyncSendData, AsyncRecvData),
    next: HashMap<String, AsyncSendData>,
    ctl_rcv: AsyncRecv<WorkNodeMsg>,
    runtime: Arc<Runtime>,
    debug: Arc<Option<WorkSpaceDebug>>,
    is_first: bool,
}

pub struct WorkNodeCtl{
    pub name: String,
    pub enable: bool,
    work_space: String,
    ctl_snd: AsyncSend<WorkNodeMsg>,
    work_node: Option<WorkNode>,
}

#[derive(Clone)]
enum WorkNodeMsg{
    AddNext(String, AsyncSendData),
}

static WORK_NODE_LIST: OnceCell<TokioRwLock<HashMap<String, WorkNodeCtl>>> = OnceCell::new();

pub async fn work_node_list_read<'a>()->TokioRwLockReadGuard<'a, HashMap<String, WorkNodeCtl>>{
    let tmp = WORK_NODE_LIST.get();

    let tmp = if let Some(tmp) = tmp{
        tmp
    }else{
        let value = match work_node_init_from_conf().await{
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("sink_init_from_conf err: {:?}", e);
                std::process::exit(-1);
            },
        };
        WORK_NODE_LIST.get_or_init(move ||{value})
    };

    tmp.read().await
}

pub async fn work_node_list_write<'a>()->TokioRwLockWriteGuard<'a, HashMap<String, WorkNodeCtl>>{

    let tmp = WORK_NODE_LIST.get();

    let tmp = if let Some(tmp) = tmp{
        tmp
    }else{
        let value = match work_node_init_from_conf().await{
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("sink_init_from_conf err: {:?}", e);
                std::process::exit(-1);
            },
        };
        let t = WORK_NODE_LIST.get_or_init(move ||{value});
        t
    };

    tmp.write().await
}

/*
    以深度优先的方式递归注册work_node节点
*/
#[async_recursion]
async fn work_node_reg_with_seq(work_node_name: &str,
                          work_node_seq: &HashMap<String,&WorkNodeConf>,
                          work_node_bak: &mut HashMap<String, WorkNodeCtl>,
                        is_online: bool)-> MapResult<()>{
    info!("work_node_reg_with_seq [{}]", work_node_name);

    if let Some(work_node_conf) = work_node_seq.get(work_node_name){
        for input in work_node_conf.input.iter(){
            if let Some(_) = work_node_bak.get_mut(input){
                continue;
            }
            if let Some(_) = work_node_seq.get(input){
                work_node_reg_with_seq(input, work_node_seq, work_node_bak, is_online).await?;
            }
        }
        work_node_reg(work_node_conf, work_node_bak, is_online).await?;
    }else{
        let err_desc = format!("work_node [{}] not exists when register", work_node_name);
        return MapResult::Err(MapErr::new(err_desc));
    }

    info!("work_node_reg_with_seq [{}] end", work_node_name);
    return MapResult::Ok(());
}

/*
    把所有即将注册的work_node节点放入临时的HashMap，然后进行DAG检测
*/
fn work_node_dag_check(work_node_seq: &HashMap<String, &WorkNodeConf>)-> MapResult<()>{
    //DAG检测的基本思想为：从任意节点出发，都不会回到原点

    for (work_node_name,_work_node_conf) in work_node_seq.iter(){
        let mut checked_seq_list = VecDeque::new();
        let mut checked_list = HashMap::new();

        work_node_dag_check_one(work_node_name, work_node_seq, &mut checked_list, &mut checked_seq_list)?;
    }

    return MapResult::Ok(());
}
fn work_node_dag_check_one(node_name: &str, work_node_seq: &HashMap<String, &WorkNodeConf>,
                           checked_list: &mut HashMap<String, ()>,
                           checked_seq_list: &mut VecDeque<String>)
                           -> MapResult<()>{
    //基于深度优先的DAG检测，同时要防止子序列存在环路导致检测过程死循环。
    if let Some(work_node) = work_node_seq.get(node_name){
        checked_seq_list.push_back(node_name.to_string());
        if let Some(_) = checked_list.insert(node_name.to_string(),()){
            //检测到环路了
            let mut err_result = MapErr::new(format!("dag checked"));

            for t in checked_seq_list.iter(){
                err_result.append(t.to_string());
            }
            return MapResult::Err(err_result);
        }

        for pre in work_node.input.iter(){
            work_node_dag_check_one(pre, work_node_seq, checked_list, checked_seq_list)?;
            checked_list.remove(pre);
            checked_seq_list.pop_back();
        }
        return MapResult::Ok(());
    }else{
        //touch the top work_node
        return MapResult::Ok(());
    }
}
/*
    用于一个work_space下的所有work_node节点在线注册
*/
pub async fn work_node_register_online(work_space_name: &str,
                                 work_node_list_conf: &[WorkNodeConf])->MapResult<()>{
    info!("work_node_register_online");

    let mut work_node_seq = HashMap::new();
    for work_node_conf in work_node_list_conf.iter(){
        work_node_seq.insert(work_node_conf.name.clone(), work_node_conf.clone());
    }
    work_node_dag_check(&work_node_seq)?;


    let mut work_node_list = HashMap::new();

    for (work_node_name, _work_node_conf) in work_node_seq.iter(){
        work_node_reg_with_seq(work_node_name, &work_node_seq, &mut work_node_list, true).await?;
    }

    let mut work_node_list_tmp= work_node_list_write().await;

    for (n,w) in work_node_list.iter(){
        if let Some(_) = work_node_list_tmp.get(n.as_str()){
            return MapResult::Err(MapErr::new(format!("work_node [{}] already exists", n)));
        }
    }

    for (_,w) in work_node_list.iter_mut(){
        work_node_run_one(w)?;
    }

    for (n,w) in work_node_list.into_iter(){
        work_node_list_tmp.insert(n,w);
    }

    return MapResult::Ok(());
}

async fn work_node_init_from_conf() -> MapResult<TokioRwLock<HashMap<String,WorkNodeCtl>>>{

    info!("work_node_init_from_conf");
    let work_node_conf = work_node_conf();

    let mut work_node_ctl_list = HashMap::new();
    let mut work_node_seq = HashMap::new();

    if let Some(ref work_node_conf) = work_node_conf{

        for one_work_node_conf in work_node_conf.iter(){
            if !one_work_node_conf.enable{
                info!("work_node {} not enable", one_work_node_conf.name);
                continue;
            }
            work_node_seq.insert(one_work_node_conf.name.clone(), one_work_node_conf.clone());
        }

        work_node_dag_check(&work_node_seq)?;

        for (work_node_name, _work_node_conf) in work_node_seq.iter(){
            work_node_reg_with_seq(work_node_name, &work_node_seq, &mut work_node_ctl_list, false).await?;
        }
    }

    info!("work_node_init_from_conf end");
    return MapResult::Ok(TokioRwLock::new(work_node_ctl_list));
}

async fn work_node_find_input(conf: &WorkNodeConf, is_online: bool)
    ->MapResult<(Option<HashMap<String, AsyncRecvData>>,String, bool)>{
    let mut work_space = None;
    let mut is_first = false;

    //input
    let mut input_list = HashMap::new();
    for input in conf.input.iter() {
        if let MapResult::Ok((r,work_space_tmp)) = work_space_add_next(input.as_str(), conf.name.as_str()).await {
            is_first = true;

            if let Some(r) = r {
                if let Some(ref work_space_ref) = work_space {
                    if work_space_ref != input {
                        let err_desc = format!("work_node [{}] input cannot belong to different work_space [{}] and [{}]", conf.name, input, work_space_ref);
                        return MapResult::Err(MapErr::new(err_desc));
                    }
                } else {
                    work_space = Some(input.to_string());
                }

                input_list.insert(input.to_string(), r);
            } else {
                info!("work_node [{}] input [{}] is work_space, but not enable",
                      conf.name, input.as_str());
                return MapResult::Ok((None,work_space_tmp,is_first));
            }
        } else {
            if is_online {
                match work_node_add_next_online(input.as_str(), conf.name.as_str()).await {
                    MapResult::Ok(wn) => {
                        if let Some((pre_work_space, r)) = wn {
                            if let Some(ref work_space_ref) = work_space {
                                if work_space_ref.as_str() != pre_work_space.as_str() {
                                    let err_desc = format!("work_node [{}] cannot belong to different work_space [{}] and [{}]", conf.name, pre_work_space, work_space_ref);
                                    return MapResult::Err(MapErr::new(err_desc));
                                }
                            } else {
                                work_space = Some(input.to_string());
                            }

                            input_list.insert(input.to_string(), r);
                        }else{
                            info!("work_node [{}] input [{}] is work_node, but not enable",
                                  conf.name, input.as_str());
                            if let Some(work_space) = work_space{
                                return MapResult::Ok((None,work_space,is_first));
                            }else{
                                return MapResult::Err(MapErr::new(
                                    format!("work_node [{}] no work_space", conf.name)));
                            }
                        }
                    }
                    MapResult::Err(e) => { return MapResult::Err(e); },
                }
            } else {
                match work_node_add_next(input.as_str(), conf.name.as_str()).await {
                    MapResult::Ok(wn) => {
                        if let Some((pre_work_space, r)) = wn {
                            if let Some(ref work_space_ref) = work_space {
                                if work_space_ref.as_str() != pre_work_space.as_str() {
                                    let err_desc = format!("work_node [{}] cannot belong to different work_space [{}] and [{}]", conf.name, pre_work_space, work_space_ref);
                                    return MapResult::Err(MapErr::new(err_desc));
                                }
                            } else {
                                work_space = Some(input.to_string());
                            }

                            input_list.insert(input.to_string(), r);
                        }else{
                            info!("work_node [{}] input [{}] is work_node, but not enable",
                                  conf.name, input.as_str());
                            if let Some(work_space) = work_space{
                                return MapResult::Ok((None,work_space,is_first));
                            }else{
                                return MapResult::Err(MapErr::new(
                                    format!("work_node [{}] no work_space", conf.name)));
                            }
                        }
                    },
                    MapResult::Err(e) => { return MapResult::Err(e); },
                }
            }
        }
    }

    if input_list.is_empty(){
        let err_desc = format!("work_node [{}] input is empty", conf.name);
        return MapResult::Err(MapErr::new(err_desc));
    }

    if let Some(work_space) = work_space{
        return MapResult::Ok((Some(input_list), work_space, is_first));
    }else{
        return MapResult::Err(MapErr::new(
            format!("work_node [{}] no work_space", conf.name)));
    }
}

async fn work_node_reg_one(conf: &WorkNodeConf,
                     node_list: &mut HashMap<String, WorkNodeCtl>,
                    r: AsyncRecv<WorkNodeMsg>,
                    is_online: bool,
                     input_list: HashMap<String,AsyncRecvData>,
                    work_space: &str, is_first: bool)
    ->MapResult<Option<WorkNode>>{

    if let Some(_) = node_list.get(&conf.name) {
        warn!("work_node register [{}] already exists", conf.name);
        return MapResult::Err(MapErr::new(format!("work_node [{}] already exists", conf.name)));
    }
    info!("work_node_reg_one [{}]", conf.name);

    //let mut pre_define = HashMap::new();
    //pre_define expr
    if let Some(pre_define_expr) = &conf.pre_define {
        for _t in pre_define_expr.iter() {
            //let pre_expr = conf_map_to_logicalplan(&t.desc)?;
            //pre_define.insert(t.name.to_string(), pre_expr);
        }
    }

    //filter
    info!("work_node [{}] init filter", conf.name);
    let mut filter_list = Vec::new();
    if let Some(ref filter_conf) = &conf.filter {
        info!("work_node [{}], conf parse filter [{}] [{}]", conf.name, filter_conf.name, filter_conf.desc);

        let filter = logical_plan_expr_func_comb_map_to_run_plan(filter_conf.name.as_str(), filter_conf.desc.as_str())?;
        let filter = FilterInstance::new(filter);
        let (s, r) = async_data_channel_new(format!("{}--{}", conf.name, filter_conf.name));
        filter_list.push((s, r, Some(filter)));
    }

    //func
    info!("work_node [{}] init func", conf.name);
    let mut func_list = Vec::new();
    if let Some(ref func) = &conf.func {
        for f in func.iter() {
            info!("work_node [{}], conf parse func [{}] [{}]", conf.name, f.name, f.desc);

            let condition = if let Some(ref condition) = f.condition {
                let condition = logical_plan_expr_func_comb_map_to_run_plan(f.name.as_str(), &condition)?;
                Some(condition)
            } else {
                None
            };

            let func = logical_plan_expr_func_map_to_run_plan(&f.desc)?;

            let (s, r) = async_data_channel_new(format!("{}--{}", conf.name, f.name));

            func_list.push((s, r, Some(FuncInstance::new(condition, func))));
        }
    }

    //window
    info!("work_node [{}] init window", conf.name);
    let mut window_list = Vec::new();
    if let Some(window_conf) = &conf.window {
        info!("work_node [{}], conf parse window", conf.name);

        let window = conf_map_to_window(window_conf)?;
        let window = logical_plan_window_to_run_plan(&window)?;

        let (s, r) = async_data_channel_new(format!("{}--{}", conf.name, window_conf.name));
        window_list.push((s, r, Some(WindowInstance::new(window))));
    }

    let (work_space_runtime, debug) = work_space_ctl_get(work_space).await?;

    let work_node = WorkNode {
        name: conf.name.to_string(),
        input: input_list,
        filter: filter_list,
        func: func_list,
        window: window_list,
        next_channel: async_data_channel_new(format!("{}--", conf.name)),
        next: HashMap::new(),
        ctl_rcv: r,
        runtime: work_space_runtime,
        debug: debug,
        is_first: is_first,
    };

    info!("work_node_reg_one [{}] end", conf.name);
    return MapResult::Ok(Some(work_node));
}
/*
    根据一个work_node节点的配置进行初始化，生成WorkNode，并且存入NodeList
    这里NodeList是全局的，因此要求所有的work_node节点名称都不能相同，重复的work_node会报失败
    work_node的input不允许为空
*/
async fn work_node_reg(conf: &WorkNodeConf,
                 node_list: &mut HashMap<String, WorkNodeCtl>,
                is_online: bool)-> MapResult<()>{
    info!("work_node register [{}]", conf.name);

    let (s, r) = async_channel_new();

    let (input_list,work_space, is_first) = work_node_find_input(conf, is_online).await?;

    let work_node = if conf.enable {
        if let Some(input_list) = input_list{
            work_node_reg_one(conf, node_list, r, is_online, input_list, work_space.as_str(), is_first).await?
        }else{
            None
        }
    }else{
        None
    };

    let mut enable = conf.enable;
    if let None = work_node{
        enable = false;
    }

    let work_node_ctl = WorkNodeCtl{
        name: conf.name.to_string(),
        enable: enable,
        work_space: work_space,
        ctl_snd: s,
        work_node: work_node,
    };

    node_list.insert(conf.name.to_string(), work_node_ctl);

    info!("work_node register [{}] end", conf.name);
    return MapResult::Ok(());
}

pub async fn work_node_add_next(work_node_name: &str, next_name: &str)
    ->MapResult<Option<(String, AsyncRecvData)>>{

    info!("work_node [{}] add next [{}]", work_node_name, next_name);
    let mut work_node_list = work_node_list_write().await;

    if let Some(work_node) = work_node_list.get_mut(work_node_name){
        if work_node.enable{
            let (s,r) = async_data_channel_new(format!("{}-{}", work_node_name, next_name));
            work_node.ctl_snd.send(WorkNodeMsg::AddNext(next_name.to_string(), s)).await;
            info!("work_node_add_next {} {} ok", work_node_name, next_name);
            return MapResult::Ok(Some((work_node.work_space.clone(),r)));
        }else{
            info!("work_node_add_next {} {} None", work_node_name, next_name);
            return MapResult::Ok(None);
        }
    }else{
        let err_desc = format!("work_node [{}] add next [{}], but not exist",
                               work_node_name, next_name);

        return MapResult::Err(MapErr::new(err_desc));
    }
}

pub async fn work_node_add_next_online(work_node_name: &str, next_name: &str)
    ->MapResult<Option<(String, AsyncRecvData)>>{

    info!("work_node_add_next_online {} {}", work_node_name,next_name);
    let work_node_list = work_node_list_read().await;

    if let Some(work_node) = work_node_list.get(work_node_name){
        if work_node.enable{
            let (s,r) = async_data_channel_new(format!("{}-{}", work_node_name, next_name));
            work_node.ctl_snd.send(WorkNodeMsg::AddNext(next_name.to_string(), s)).await;
            return MapResult::Ok(Some((work_node.work_space.clone(), r)));
        }else{
            return MapResult::Ok(None);
        }
    }else {
        return MapResult::Err(MapErr::new(format!("work_node not exists")));
    }
}

pub async fn work_node_get_work_space_name(work_node_name: &str)
    ->Option<String>{

    let work_node_list = work_node_list_read().await;

    if let Some(work_node) = work_node_list.get(work_node_name){
        return Some(work_node.work_space.clone());
    }else {
        return None;
    }
}

pub async fn work_node_run()->MapResult<()>{
    info!("work_node_run");

    let mut work_node_list = work_node_list_write().await;

    for (_, work_node) in work_node_list.iter_mut(){
        work_node_run_one(work_node)?;
    }

    info!("work_node_run end");
    return MapResult::Ok(());
}

fn work_node_run_one(work_node_ctl: &mut WorkNodeCtl)->MapResult<()>{

    info!("work_node_run_one {}", work_node_ctl.name);

    /*
    每一个work_node：
        1）每一个input都启动一个异步feature，负责把数据分发给第一个filter或者第一个func或者next channel
        2）每一个filter都启动一个异步feature，负责数据的判断和发给下一个filter或者第一个func或者next channel
        3）每一个func都启动一个异步feature，负责本func的condition判断和hand处理，并把数据发给下一个func或者next channel
        4）启动一个feature负责把next channel的数据传给所有next
    */

    //each work_node

    if work_node_ctl.enable{
        if let Some(ref mut work_node) = work_node_ctl.work_node{
            let mut filter_list = Vec::new();
            for (s,r,_) in work_node.filter.iter(){
                filter_list.push((s.clone(),r.clone()));
            }

            let mut func_list = Vec::new();
            for (s,r,_) in work_node.func.iter(){
                func_list.push((s.clone(),r.clone()));
            }

            let mut window_list = Vec::new();
            for (s,r,_) in work_node.window.iter(){
                window_list.push((s.clone(),r.clone()));
            }

            //each input
            for (input_name,input) in work_node.input.iter(){
                let work_node_name = work_node.name.clone();
                let debug = Arc::clone(&work_node.debug);
                let is_first = work_node.is_first;
                let input = input.clone();
                let input_name = input_name.to_string();
                let next_send = match filter_list.first(){
                    Some((s,_)) => {s.clone()},
                    None => {
                        match func_list.first(){
                            Some((s,_)) => {s.clone()},
                            None => {
                                if let Some((s,_)) = &window_list.first(){
                                    s.clone()
                                }else{
                                    work_node.next_channel.0.clone()
                                }
                            },
                        }
                    },
                };

                work_node.runtime.spawn(
                    work_node_input(work_node_name, input_name, input, next_send, debug, is_first)
                );
            }

            //each filter
            for (index,(_,r,filter)) in work_node.filter.iter_mut().enumerate(){
                let work_node_name = work_node.name.clone();
                let debug = Arc::clone(&work_node.debug);
                let filter = match filter.take(){
                    Some(t) => {t},
                    None => {
                        let err_desc = format!("work_node {} filter is None when run", work_node_name);
                        return MapResult::Err(MapErr::new(err_desc));
                    },
                };
                let input = r.clone();
                let next_send = if let Some((s,_)) = filter_list.get(index+1){
                    s.clone()
                }else{
                    match func_list.first(){
                        Some((s,_)) => {s.clone()},
                        None => {
                            if let Some((s,_)) = &window_list.first(){
                                s.clone()
                            }else{
                                work_node.next_channel.0.clone()
                            }
                        },
                    }
                };

                work_node.runtime.spawn(
                    work_node_filter(work_node_name, input, filter, next_send, debug)
                );
            }

            //each func
            for (index,(_,r,func)) in work_node.func.iter_mut().enumerate(){
                let work_node_name = work_node.name.clone();
                let debug = Arc::clone(&work_node.debug);
                let func = match func.take(){
                    Some(t) => {t},
                    None => {
                        let err_desc = format!("work_node {} func is None when run", work_node_name);
                        return MapResult::Err(MapErr::new(err_desc));
                    },
                };
                let input = r.clone();
                let next_send = if let Some((s,_)) = func_list.get(index+1){
                    s.clone()
                }else{
                    if let Some((s,_)) = window_list.first(){
                        s.clone()
                    }else{
                        work_node.next_channel.0.clone()
                    }
                };

                work_node.runtime.spawn(
                    work_node_func(work_node_name, input, func, next_send, debug)
                );
            }

            //window
            for (index,(_,r,window)) in work_node.window.iter_mut().enumerate(){

                let work_node_name = work_node.name.clone();
                let debug = Arc::clone(&work_node.debug);
                let window = match window.take(){
                    Some(t) => {t},
                    None => {
                        let err_desc = format!("work_node {} window is None when run", work_node_name);
                        return MapResult::Err(MapErr::new(err_desc));
                    },
                };
                let input = r.clone();

                let next_send = if let Some((s,_)) = window_list.get(index+1){
                    s.clone()
                }else{
                    work_node.next_channel.0.clone()
                };

                work_node.runtime.spawn(
                    work_node_window(work_node_name, input, window, next_send, debug)
                );
            }

            //next
            let next = work_node.next.clone();
            let input = work_node.next_channel.1.clone();
            let ctl_rcv = work_node.ctl_rcv.clone();

            work_node.runtime.spawn(
                work_node_next(input, next, ctl_rcv)
            );
        }else{
            error!("work_node [{}] is enable, but work_node is None", work_node_ctl.name);
            std::process::exit(-1);
        }
    }else{
        info!("work_node [{}] is not enable", work_node_ctl.name);
    }

    return MapResult::Ok(());
}
async fn work_node_window(_work_node_name: String,
                          input: AsyncRecvData,
                          mut window: Box<WindowInstance>,
                          next: AsyncSendData,
                          _debug: Arc<Option<WorkSpaceDebug>>){

    loop{
        if let Some(data) = input.recv().await{
            match data{
                DataInstance::BatchData(batch_data) => {
                    for data in batch_data.into_iter(){
                        match window.window.evaluate(data).await{
                            WindowResult::Ok(window_group) => {
                                if let Some(window_group) = window_group{
                                    next.send(DataInstance::GroupData(GroupData::new_with_window_group(window_group))).await;
                                }
                            },
                            WindowResult::Err(_) => {},
                        }
                    }
                },
                DataInstance::GroupData(_) => {},
            }
        }
    }
}
async fn work_node_func(work_node_name: String, input: AsyncRecvData, mut func: Box<FuncInstance>,
                        next: AsyncSendData, _debug: Arc<Option<WorkSpaceDebug>>){

    loop{
        if let Some(mut data) = input.recv().await{

            let mut condition_result = Vec::new();

            if let Some(ref mut condition) = func.condition{

                //这里做一个约定，只要有condition，那么只能是condition正确执行，并且结果是bool true的时候，才能执行
                let condition_ret = condition.evaluate(&mut data).await;
                for r in condition_ret.iter(){
                    match r{
                        HandResult::Err(_) => {
                            condition_result.push(false);
                        },
                        HandResult::Ok(r) => {
                            if let RealValue::Boolean(r) = r{
                                if *r{
                                    condition_result.push(true);
                                }else{
                                    condition_result.push(false);
                                }
                            }else{
                                condition_result.push(false);
                            }
                        },
                    }
                }
            }

            if !condition_result.is_empty(){
                match data{
                    DataInstance::BatchData(batch_data) => {
                        if batch_data.len() == condition_result.len(){
                            let mut other = Vec::new();
                            let mut tmp = Vec::new();
                            for (idx,d) in batch_data.into_iter().enumerate(){
                                if condition_result[idx]{
                                    tmp.push(d);
                                }else{
                                    other.push(d);
                                }
                            }
                            if tmp.len() > 0{
                                let mut data = DataInstance::BatchData(tmp);
                                /*
                                    work_node的递归函数处理结果貌似暂时不需要处理
                                */
                                let _func_restult = func.func.evaluate(&mut data).await;
                                next.send(data).await;
                            }
                            if other.len() > 0{
                                /*
                                    这里这个解决方案不太优雅，破坏了原来数据的顺序，后面调优性能的时候再优化一下。
                                */
                                let data = DataInstance::BatchData(other);
                                next.send(data).await;
                            }
                        }else{
                            indicators_key_ref(Op::Add, IndicatorsMode::WorkNode,
                                               work_node_name.as_str(), "func batch len err", 1);
                        }
                    },
                    DataInstance::GroupData(group_data) => {
                        if group_data.len() == condition_result.len(){
                            let mut tmp = GroupData::new();
                            for (idx,d) in group_data.into_iter().enumerate(){
                                if condition_result[idx]{
                                    tmp.push(d);
                                }
                            }
                            if tmp.len() > 0{
                                let mut data = DataInstance::GroupData(tmp);
                                /*
                                    work_node的递归函数处理结果貌似暂时不需要处理
                                */
                                let _func_restult = func.func.evaluate(&mut data).await;
                                next.send(data).await;
                            }
                        }else{
                            indicators_key_ref(Op::Add, IndicatorsMode::WorkNode,
                                               work_node_name.as_str(), "func batch len err", 1);
                        }
                    },
                }
            }else{
                next.send(data).await;
            }
        }
    }
}

/*
    send data to node next
*/
async fn work_node_next(input: AsyncRecvData, mut next: HashMap<String, AsyncSendData>, ctl_msg: AsyncRecv<WorkNodeMsg>){

    loop{
        tokio::select! {
            msg = ctl_msg.recv() => {
                match msg{
                    WorkNodeMsg::AddNext(next_name, s) => {
                        next.insert(next_name, s);
                    },
                }
            },
            data = input.recv() => {
                if let Some(data) = data{
                    for (_,next) in next.iter(){
                        next.send(data.clone()).await;
                    }
                }
            },
        }
    }
}

async fn work_node_filter(work_node_name: String, input: AsyncRecvData, mut filter: Box<FilterInstance>,
                          next: AsyncSendData, _debug: Arc<Option<WorkSpaceDebug>>){
    loop{
        if let Some(mut data) = input.recv().await{
            let mut condition_result = Vec::new();

            let filter_result = filter.func.evaluate(&mut data).await;
            for r in filter_result.iter(){
                match r{
                    HandResult::Err(_) => {
                        condition_result.push(false);
                    },
                    HandResult::Ok(r) => {
                        if let RealValue::Boolean(r) = r{
                            if *r{
                                condition_result.push(true);
                            }else{
                                condition_result.push(false);
                            }
                        }else{
                            condition_result.push(false);
                        }
                    },
                }
            }

            match data {
                DataInstance::BatchData(batch_data) => {
                    if batch_data.len() == condition_result.len() {
                        let mut tmp = Vec::new();
                        for (idx, d) in batch_data.into_iter().enumerate() {
                            if condition_result[idx] {
                                tmp.push(d);
                            }
                        }
                        if tmp.len() > 0 {
                            let data = DataInstance::BatchData(tmp);
                            next.send(data).await;
                        }
                    } else {
                        indicators_key_ref(Op::Add, IndicatorsMode::WorkNode,
                                           work_node_name.as_str(), "func batch len err", 1);
                    }
                },
                DataInstance::GroupData(group_data) => {
                    if group_data.len() == condition_result.len() {
                        let mut tmp = GroupData::new();
                        for (idx, d) in group_data.into_iter().enumerate() {
                            if condition_result[idx] {
                                tmp.push(d);
                            }
                        }
                        if tmp.len() > 0 {
                            let data = DataInstance::GroupData(tmp);
                            next.send(data).await;
                        }
                    } else {
                        indicators_key_ref(Op::Add, IndicatorsMode::WorkNode,
                                           work_node_name.as_str(), "batch len err", 1);
                    }
                },
            }
        }
    }
}

async fn work_node_input(work_node_name: String, input_name: String, input: AsyncRecvData, next: AsyncSendData,
                         _debug: Arc<Option<WorkSpaceDebug>>, is_first: bool){

    work_node_input_register(input_name.as_str(), input.clone()).await;
    let work_node_name = Arc::new(work_node_name);

    loop{
        if let Some(mut data) = input.recv().await{
            indicators_key_ref(Op::Add, IndicatorsMode::WorkNode, work_node_name.as_str(),"recv", 1);
            if let DataInstance::BatchData(ref mut batch_data) = data{
                for data in batch_data.iter_mut(){
                    data.metadata.work_node = Arc::clone(&work_node_name);
                }
            }
            next.send(data).await;
        }
    }
}
